package com.github.ghsea.framework.job.core;

import java.util.ArrayList;
import java.util.List;

import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import com.github.ghsea.framework.job.common.Endpoint;
import com.github.ghsea.framework.job.common.EndpointChangedListener;
import com.github.ghsea.framework.job.common.EndpointZkNodeData;
import com.github.ghsea.framework.job.common.support.IpUtils;
import com.github.ghsea.framework.job.support.ServerConfig;
import com.github.ghsea.framework.job.support.ZkPath;
import com.github.ghsea.framework.zkclient.ZkClient;

public class JobServerBootstrap
		implements InitializingBean, DisposableBean, ApplicationListener<ContextRefreshedEvent> {

	private static ZkClient zkClient;

	private static Logger log = LoggerFactory.getLogger(JobServerBootstrap.class);

	private String quartzConfigFileName;
	private String serverConfigFileName;

	private ServerConfig serverConfig;

	private JobServer jobServer;

	private static EndpointChangedListener serverNodeChangedListener = new EndpointChangedListener();

	public JobServerBootstrap(String quartzConfigFileName, String serverConfigFileName) {
		this.quartzConfigFileName = quartzConfigFileName;
		this.serverConfigFileName = serverConfigFileName;
	}

	/**
	 * 初始化zk结点，并将ip:port注册为临时节点
	 */
	public void onApplicationEvent(ContextRefreshedEvent event) {
		try {
			registerServer();
			getAndWatchAllServers();

			this.jobServer.start();
		} catch (Exception e) {
			log.error(e.getMessage(), e);
			e.printStackTrace();
		}
	}

	private void getAndWatchAllServers() throws Exception {
		serverNodeChangedListener = new EndpointChangedListener();
		List<String> serverNodes = zkClient.getChildrenAndWatch(ZkPath.getPathOfServers(), serverNodeChangedListener);
		List<Endpoint> endpoints = new ArrayList<>(serverNodes.size());
		String zkServerPath = ZkPath.getPathOfServers();
		serverNodes.forEach(each -> {
			EndpointZkNodeData data;
			try {
				data = (EndpointZkNodeData) zkClient.getData(ZKPaths.makePath(zkServerPath, each));
				endpoints.add(Endpoint.build(each, data.getBaseAccessUrl()));
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		});
		serverNodeChangedListener.init(endpoints);
	}

	private void registerServer() throws Exception {
		String zkServerPath = ZkPath.getPathOfServers();
		if (!zkClient.exist(zkServerPath)) {
			zkClient.create(zkServerPath);
		}

		String ipAndPort = IpUtils.getLocalIp() + ":" + serverConfig.getPort();
		String serverZkPath = ZKPaths.makePath(ZkPath.getPathOfServers(), ipAndPort);
		if (zkClient.exist(serverZkPath)) {
			zkClient.delete(serverZkPath);
		}
		EndpointZkNodeData nodeData = new EndpointZkNodeData();
		nodeData.setBaseAccessUrl(serverConfig.getBaseAccessUrl());
		zkClient.createEphemeral(serverZkPath, nodeData, true);
	}

	public void afterPropertiesSet() throws Exception {
		serverConfig = new ServerConfig(serverConfigFileName);
		zkClient = ZkClient.getInstance(serverConfig.getZkServers());

		jobServer = new JobServer(serverConfig, quartzConfigFileName);
	}

	@Override
	public void destroy() throws Exception {
		zkClient.close();
	}

	public static EndpointChangedListener getServerNodeChangedListener() {
		return serverNodeChangedListener;
	}

	public static ZkClient getZkClient() {
		return zkClient;
	}
}
